#include "LoopThreadPool.h"
#include "LoopService.h"
#include "../common/ErrorCode.h"
#include <assert.h>



LoopThreadPool::LoopThreadPool(int thread_num/* = 1*/)
	: thread_num_(thread_num)
	, next_loop_id_(0)
	, status_(kNull)
	, thread_run_num_(0)
	, thread_stop_num_(0)
{
	assert(thread_num_ > 0);
	for(int i = 0; i < thread_num_; ++i){
		LoopService* loop = new LoopService;
		loops_.emplace_back(loop);
	}
}


LoopThreadPool::~LoopThreadPool()
{
	assert(status_.load() == kStopped);
	stop();
}

std::error_code LoopThreadPool::start()
{
	assert(status_.load() == kNull);
    if (status_.load() != kNull && status_.load() != kRunning) {
        return make_custom_error_code(custom_error::pool_bad_status);
    }
    
	status_.store(kRunning);
	for (auto loop : loops_)
	{
		auto precb = [this](){
			++thread_run_num_;
		};
		loop->post(precb);
		std::thread rth([this,loop](){
			loop->run();
			++thread_stop_num_;
		});
		rth.detach();
		threads_.emplace_back(std::move(rth));
	}
	return std::error_code();
}

void LoopThreadPool::stop()
{
	if(status_.load() == kStopped)
		return;

	status_.store(kStopped);
	for (auto loop : loops_)
	{
		loop->post([loop](){
			loop->stop();
			delete loop;
		});
	}
	while(thread_stop_num_ != loops_.size())
		std::this_thread::yield();
	loops_.clear();
	threads_.clear();
}

void LoopThreadPool::after_fork()
{
    for (auto lp : loops_) {
        lp->after_fork();
    }
}

LoopService* LoopThreadPool::next_loop()
{
	if(status_.load() == kStopped)
		return nullptr;
	return loops_[next_loop_id_++ % thread_num_];
}

LoopService* LoopThreadPool::find_loop_by_base(event_base* b)
{
    for (auto& lp : loops_) {
        if (lp->event_base() == b) {
            return lp;
        }
    }
    return nullptr;
}

